[SPARK-57760][STREAMING] Make small optimizations to StatefulProcessorApiClient#56786
[SPARK-57760][STREAMING] Make small optimizations to StatefulProcessorApiClient#56786funrollloops wants to merge 3 commits into
Conversation
HyukjinKwon
left a comment
There was a problem hiding this comment.
2 blocking, 0 non-blocking, 1 nit.
The closure-hoisting optimization is good; the serializer swap and the fast-path subclass leak read as unintended behavior changes for a "small optimizations" PR.
Correctness (2)
- stateful_processor_api_client.py:110:
PickleSerializer()drops cloudpickle —CPickleSerializerdefaults toCloudPickleSerializer— see inline - stateful_processor_api_client.py:45: scalar fast-path returns
np.float64/pd.Timestampunconverted (they subclassfloat/datetime) — see inline
Nits: 1 minor item (see inline comments).
Verification
Confirmed empirically: issubclass(np.float64, float) and issubclass(pandas.Timestamp, datetime) are both True, while np.int64/np.bool_ are not subclasses of int/bool. So the new fast-path leaks exactly np.float64 and pd.Timestamp (returned as-is, skipping .tolist()/.to_pydatetime()), while np.int64/np.bool_ still correctly reach the np.generic branch.
PR description suggestions
- Document: why the serializer is changed from
CPickleSerializertoPickleSerializer(this is a capability change, not just an optimization) — and whether dropping cloudpickle is intended. - Add: a real JIRA id (currently
SPARK-?????) and a short note on what is being optimized and how it was measured.
Benchmarked a tuple vs a frozenset, and the tuple came out noticeably faster.
|
Hi @HyukjinKwon, I'm waiting on my Jira account to be created. I'll create a Jira ticket for this PR once I am able. Attached is the script I used to benchmark the _serialize_to_bytes function. |
What changes were proposed in this pull request?
Make two small optimizations to StatefulProcessorApiClient:
mapto generator comprehensions, and move the numpy import and function definition to the top level so it is done onceBenchmarks
This is a microbenchmark for
_serialize_to_bytes:Before
After
Why are the changes needed?
Together these changes improve transform with state on a simple rolling-window style benchmark by ~10%.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing unit tests.
Was this patch authored or co-authored using generative AI tooling?
No, but Claude was consulted in the process of producing this PR.